4e9b3a55f781cde7760c71842b2884a6eaf75819,src/org/jgroups/protocols/pbcast/NAKACK2.java,NAKACK2,removeAndPassUp,#Table#Address#boolean#AsciiString#,879
Before Change
boolean remove_msgs=discard_delivered_msgs && !loopback;
MessageBatch batch=new MessageBatch(max_msg_batch_size).dest(null).sender(sender).clusterName(cluster_name).multicast(true);
Supplier<MessageBatch> batch_creator=() -> batch;
while(true) {
batch.reset();
// We're removing as many msgs as possible and set processing to false (if null) *atomically* (wrt to add())
// Don't include DUMMY and OOB_DELIVERED messages in the removed set
buf.removeMany(processing, remove_msgs, max_msg_batch_size,
no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs,
batch_creator, BATCH_ACCUMULATOR, BATCH_VALIDATOR);
if(batch.isEmpty()) {
if(rebroadcasting)
checkForRebroadcasts();
return;
}
deliverBatch(batch);
}
}
After Change
* Benefit: fewer threads blocked on the same lock, these threads can be returned to the thread pool
*/
protected void removeAndDeliver(Table<Message> buf, Address sender, boolean loopback, AsciiString cluster_name) {
AtomicInteger adders=buf.getAdders();
if(adders.getAndIncrement() != 0)
return;
boolean remove_msgs=discard_delivered_msgs && !loopback;
MessageBatch batch=new MessageBatch(buf.size()).dest(null).sender(sender).clusterName(cluster_name).multicast(true);
Supplier<MessageBatch> batch_creator=() -> batch;
do {
try {
batch.reset();
// Don't include DUMMY and OOB_DELIVERED messages in the removed set
buf.removeMany(remove_msgs, 0, no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs,
batch_creator, BATCH_ACCUMULATOR);
}
catch(Throwable t) {
log.error("failed removing messages from table for " + sender, t);
}
if(!batch.isEmpty())
deliverBatch(batch);
}
while(adders.decrementAndGet() != 0);
if(rebroadcasting)
checkForRebroadcasts();
}